Skip to content

feat: latest_event_at in stream info #1409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 20, 2025

Conversation

nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Aug 20, 2025

add new field latest_event_at in stream info
can be fetched from API - api/prism/v1/logstream/{name}/info

refactor first_event_at fetch as well

optimise retention action and retention cleanup in all live ingestors remove fetch of first_event_at from retention cleanup

Summary by CodeRabbit

  • New Features

    • Stream info now includes latestEventAt; storage fetch returns first and latest timestamps in one call.
    • Home dataset entries may include timePartition; home response includes datasets and topFiveIngestion.
    • New internal dataset-stats stream ("pstats") emits per-record events with file-derived timestamps.
  • Refactor

    • Retention cleanup is orchestrated cluster-wide and runs concurrently.
    • Stream info requests degrade gracefully on storage errors instead of failing.
  • Breaking change (API)

    • StreamInfo and several responses now use camelCase JSON and include latestEventAt.
    • Retention cleanup endpoint returns 204 No Content on success.
    • Some stream helper APIs were removed or changed; event creation now requires an explicit timestamp.
  • Bug Fixes

    • Deletion aborts on snapshot-update failures to avoid inconsistent state.

add new field latest_event_at in stream info
can be fetched from API - api/prism/v1/logstream/{name}/info

refactor first_event_at fetch as well

optimise retention action and retention cleanup in all live ingestors
remove fetch of first_event_at from retention cleanup
Copy link
Contributor

coderabbitai bot commented Aug 20, 2025

Walkthrough

Removes memory-first first-event helpers, replaces single-timestamp storage API with a dual first+latest API, migrates retention cleanup to cluster-level concurrent dispatch, simplifies upload/stats flows, and adds latest_event_at plus camelCase serialization to StreamInfo.

Changes

Cohort / File(s) Summary
Catalog retention orchestration
src/catalog/mod.rs
Removed get_first_event; changed remove_manifest_from_snapshot to return (); updates snapshot (filter manifests, update deleted stats, clear first_event_at, persist); dispatches retention cleanup via cluster::for_each_live_ingestor concurrently.
Cluster HTTP request API
src/handlers/http/cluster/mod.rs
send_retention_cleanup_request now accepts dates: &[String] and returns Result<(), ObjectStorageError>; removed pre-call liveness check; POSTs JSON dates and logs non-success response bodies.
HTTP logstream info (server)
src/handlers/http/logstream.rs
get_stream_info now calls storage.get_first_and_latest_event_from_storage; returns latest_event_at in StreamInfo; on storage error logs warning and defaults to (None, None); no in-memory first_event_at updates.
Ingestor retention handler
src/handlers/http/modal/ingest/ingestor_logstream.rs
Use PARSEABLE.storage().get_object_store() accessor; handle remove_manifest_from_snapshot errors explicitly (return StreamError::Custom 500); on success return 204 No Content (no first_event_at).
Legacy helpers removed
src/logstream/mod.rs
Deleted file and public helpers get_stream_schema_helper and get_stream_info_helper.
Prism logstream info
src/prism/logstream/mod.rs
Replace memory-first timestamp logic with get_first_and_latest_event_from_storage; populate latest_event_at; on storage error log warning and default to (None, None).
Storage public model (serialization)
src/storage/mod.rs
Added pub latest_event_at: Option<String> to StreamInfo and applied #[serde(rename_all = "camelCase")] to the struct (serialized field names now camelCase).
Object storage trait & manifest helpers
src/storage/object_storage.rs
Replaced get_first_event_from_storage with get_first_and_latest_event_from_storage -> Result<(Option<String>, Option<String>), ...>; added load_manifest_from_path, extract_timestamps_from_manifest, extract_timestamp_from_manifest; extract min/max timestamps from manifest stats; simplified upload types and removed per-upload stats_calculated/stats sync flows.
Retention flow control
src/storage/retention.rs
action::delete now aborts on snapshot-update error (logs and returns early); removed post-removal first_event_at update logic.
Home API & models
src/prism/home/mod.rs
Added time_partition: Option<String> to DataSet; HomeResponse now includes datasets and top_five_ingestion; applied camelCase serde to response models and propagate time_partition from stream metadata.
Alerts serialization
src/alerts/alert_structs.rs
Added #[serde(rename_all = "camelCase")] to AlertsSummary/AlertsInfoByState/AlertsInfo and removed per-field rename for not_triggered.
Event timestamp API
src/event/format/json.rs, src/connectors/kafka/processor.rs, src/handlers/http/ingest.rs
Event constructor now requires explicit DateTime<Utc> parameter; callers updated to pass Utc::now() (and tests adjusted).
Field-stats and dataset stats stream
src/storage/field_stats.rs, src/utils/json/mod.rs, src/utils/mod.rs
Added DATASET_STATS_STREAM_NAME constant and per-record event emission using timestamps extracted from parquet path; made apply_generic_flattening_for_partition public; removed duplicate DATASET_STATS_STREAM_NAME in utils.
Minor accessor changes
src/migration/mod.rs, other callers
Switched usages from PARSEABLE.storage.get_object_store() to PARSEABLE.storage().get_object_store() (method accessor).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Admin as Caller
  participant Catalog as catalog::remove_manifest_from_snapshot
  participant Cluster as cluster::for_each_live_ingestor
  participant Ingestor as ingestor_http.retention_cleanup

  Admin->>Catalog: remove_manifest_from_snapshot(stream, dates)
  Catalog->>Catalog: update snapshot (filter manifests, update deleted stats, clear first_event_at, persist)
  Catalog->>Cluster: for_each_live_ingestor(concurrent)
  par For each live ingestor
    Cluster->>Ingestor: POST /retention/cleanup {dates}
    Ingestor-->>Cluster: 200 OK or error
  end
  Cluster-->>Catalog: All tasks complete
  Catalog-->>Admin: Ok(())
Loading
sequenceDiagram
  autonumber
  actor Client
  participant HTTP as handlers/http/logstream.get_stream_info
  participant Storage as ObjectStorage

  Client->>HTTP: GET /streams/{name}/info
  HTTP->>Storage: get_first_and_latest_event_from_storage(name)
  alt Success
    Storage-->>HTTP: (first?, latest?)
  else Error
    Storage-->>HTTP: Err
    HTTP->>HTTP: set (None, None), log warn
  end
  HTTP-->>Client: StreamInfo { ..., first_event_at, latest_event_at, ... }
Loading
sequenceDiagram
  autonumber
  participant Obj as ObjectStorage
  participant Snap as Snapshot/Manifests
  participant Helper as Manifest Helpers

  Obj->>Snap: collect manifest_list from stream metadata
  alt No manifests
    Snap-->>Obj: []
    Obj-->>Obj: return (None, None)
  else Manifests exist
    Obj->>Helper: find manifest with min(time_lower_bound) and max(time_upper_bound)
    alt Same manifest
      Obj->>Helper: load_manifest_from_path(path)
      Helper-->>Obj: Manifest
      Obj->>Helper: extract_timestamps_from_manifest(manifest, partition_col)
      Helper-->>Obj: (min_ts?, max_ts?)
    else Different manifests
      Obj->>Helper: extract_timestamp_from_manifest(min_path, col, find_min=true)
      Obj->>Helper: extract_timestamp_from_manifest(max_path, col, find_min=false)
      Helper-->>Obj: min_ts? / max_ts?
    end
    Obj-->>Obj: format RFC3339 strings
    Obj-->>Obj: return (first?, latest?)
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • de-sh

Poem

I hop through manifests, tidy and fleet,
Pair first with latest, keep timelines neat.
I ping each burrow across the cluster trail,
Trim old snapshots, then nibble a snail. 🐇✨

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@nikhilsinhaparseable nikhilsinhaparseable marked this pull request as ready for review August 20, 2025 04:30
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/handlers/http/cluster/mod.rs (1)

599-628: Non-success HTTP responses are swallowed — propagate as errors

Currently, a non-2xx response logs the body but still returns Ok(()). Callers (e.g., catalog::remove_manifest_from_snapshot) will treat failed ingestor cleanups as success, leading to inconsistent cleanup across the cluster.

Apply this to fail fast on non-success:

 pub async fn send_retention_cleanup_request(
     url: &str,
     ingestor: IngestorMetadata,
     dates: &[String],
 ) -> Result<(), ObjectStorageError> {
@@
-    if !resp.status().is_success() {
-        let body = resp.text().await.unwrap_or_default();
-        error!(
-            "failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
-            ingestor.domain_name, body
-        );
-    }
-
-    Ok(())
+    if !resp.status().is_success() {
+        let status = resp.status();
+        let body = resp.text().await.unwrap_or_default();
+        error!(
+            "failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
+            ingestor.domain_name, body
+        );
+        return Err(ObjectStorageError::Custom(format!(
+            "retention cleanup failed on ingestor {} with status {}: {}",
+            ingestor.domain_name, status, body
+        )));
+    }
+
+    Ok(())
 }
🧹 Nitpick comments (2)
src/handlers/http/modal/ingest/ingestor_logstream.rs (1)

65-65: Prefer 204 No Content over 200 "Cleanup complete"

The response body isn’t consumed by callers; returning 204 reduces noise and aligns with "action completed, no payload".

-    Ok(("Cleanup complete", StatusCode::OK))
+    Ok(actix_web::HttpResponse::NoContent().finish())
src/prism/logstream/mod.rs (1)

155-155: Use PARSEABLE.storage() accessor for consistency (and to avoid depending on field visibility).

Prefer the public accessor to obtain the storage provider, matching the rest of the codebase.

-    let storage = PARSEABLE.storage.get_object_store();
+    let storage = PARSEABLE.storage().get_object_store();
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e7d7217 and 87f499e.

📒 Files selected for processing (9)
  • src/catalog/mod.rs (3 hunks)
  • src/handlers/http/cluster/mod.rs (2 hunks)
  • src/handlers/http/logstream.rs (2 hunks)
  • src/handlers/http/modal/ingest/ingestor_logstream.rs (1 hunks)
  • src/logstream/mod.rs (0 hunks)
  • src/prism/logstream/mod.rs (2 hunks)
  • src/storage/mod.rs (1 hunks)
  • src/storage/object_storage.rs (2 hunks)
  • src/storage/retention.rs (1 hunks)
💤 Files with no reviewable changes (1)
  • src/logstream/mod.rs
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-18T12:37:47.732Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.732Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/storage/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/prism/logstream/mod.rs
  • src/handlers/http/logstream.rs
🧬 Code Graph Analysis (7)
src/handlers/http/modal/ingest/ingestor_logstream.rs (1)
src/catalog/mod.rs (1)
  • remove_manifest_from_snapshot (454-495)
src/storage/retention.rs (1)
src/catalog/mod.rs (1)
  • remove_manifest_from_snapshot (454-495)
src/catalog/mod.rs (2)
src/handlers/http/mod.rs (1)
  • base_path_without_preceding_slash (79-81)
src/handlers/http/cluster/mod.rs (2)
  • for_each_live_ingestor (76-110)
  • send_retention_cleanup_request (596-628)
src/prism/logstream/mod.rs (1)
src/parseable/mod.rs (1)
  • storage (244-246)
src/storage/object_storage.rs (1)
src/query/stream_schema_provider.rs (1)
  • manifest_items (1032-1059)
src/handlers/http/logstream.rs (1)
src/parseable/mod.rs (1)
  • storage (244-246)
src/handlers/http/cluster/mod.rs (1)
src/handlers/http/modal/mod.rs (2)
  • domain_name (573-573)
  • domain_name (580-582)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (12)
src/storage/mod.rs (1)

139-141: latest_event_at addition is correct and backward-compatible

Serde rename to "latest-event-at" and skip-when-None match existing conventions (e.g., created-at, first-event-at). Safe for clients.

If you maintain OpenAPI/SDKs, please ensure latest-event-at is documented and exposed to clients.

src/handlers/http/modal/ingest/ingestor_logstream.rs (1)

54-63: Good error propagation for retention cleanup

On failure, converting to StreamError::Custom with 500 and a descriptive message is appropriate and consistent with other handlers.

src/storage/retention.rs (1)

197-205: Abort delete on snapshot-update failure — good call

Short-circuiting delete when remove_manifest_from_snapshot fails prevents snapshot/data divergence. The error message is clear and actionable.

src/handlers/http/logstream.rs (1)

338-351: Fetching first/latest timestamps with a single storage call and graceful fallback looks good

Consolidating to get_first_and_latest_event_from_storage reduces I/O. On error, returning (None, None) with a warn avoids failing the endpoint. Including latest_event_at in StreamInfo is wired correctly.

Also applies to: 365-365

src/prism/logstream/mod.rs (2)

157-170: Good: single storage round-trip with graceful fallback.

Fetching both first/latest timestamps in one call and defaulting to (None, None) on failure is clean and keeps the happy-path simple. The warning is sufficient for observability.


180-185: Verify API contract and client updates for latest_event_at

I didn’t find any OpenAPI/Swagger definitions or client–side schemas in the repo that include the new latest-event-at field. Please make sure:

• Your OpenAPI (or Swagger) spec exposes the latest-event-at property on StreamInfo
• Any generated clients or UI consumers pull in and render this new field

src/catalog/mod.rs (2)

458-471: Snapshot pruning + deleted stats update LGTM; clearing first_event_at fits new flow.

  • Removing matching manifest entries via retain is straightforward.
  • Calling update_deleted_stats before persisting snapshot is correct.
  • Resetting both in-memory and persisted first_event_at aligns with the new storage-driven timestamp fetch.

494-495: Return Ok(()) on completion: LGTM.

Return type change aligns with the new cluster-wide cleanup flow that no longer returns first_event_at.

src/storage/object_storage.rs (4)

75-76: Good fallback to DEFAULT_TIMESTAMP_KEY.

Using DEFAULT_TIMESTAMP_KEY when no time partition is set is consistent with prior behavior.


929-961: Robust manifest loading by absolute URL: LGTM.

Using the query runtime’s object_store_registry keyed by self.store_url() is appropriate for fetching arbitrary absolute manifest paths.


1019-1035: Helper wrapper: clear and reusable.

extract_timestamp_from_manifest cleanly composes the two helpers.


844-928: Efficient dual-timestamp retrieval from manifests: LGTM; old API fully retired.
Verification confirms no remaining references to get_first_event_from_storage in the codebase—safe to remove the old API.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/storage/object_storage.rs (1)

1135-1174: Don’t delete staged files on failed upload; leave them for retry.

When manifest_file is None, this code removes the staged file. Prior guidance for this codebase is to keep failed parquet files in staging for retry in the next sync cycle. Even if the current implementation always sets manifest_file: Some(...) on success and returns Err on failure, this branch is a footgun if that assumption ever changes.

Recommend removing the cleanup in the None case and only deleting files after they’re included in manifest_files.

-                if let Some(manifest_file) = upload_result.manifest_file {
-                    uploaded_files.push((upload_result.file_path, manifest_file));
-                } else {
-                    // File failed to upload, clean up
-                    if let Err(e) = remove_file(upload_result.file_path) {
-                        warn!("Failed to remove staged file: {e}");
-                    }
-                }
+                if let Some(manifest_file) = upload_result.manifest_file {
+                    uploaded_files.push((upload_result.file_path, manifest_file));
+                } else {
+                    // Keep failed files in staging for retry in next sync cycle.
+                    // Intentionally no cleanup here.
+                }
🧹 Nitpick comments (12)
src/migration/mod.rs (1)

369-369: Standardize storage accessor usage for consistency

Switched to PARSEABLE.storage().get_object_store(), which is correct per the new accessor. Elsewhere in this file we still use config.storage.get_object_store(). Consider standardizing on the accessor to avoid confusion and keep a single pattern across modules.

src/handlers/http/modal/ingest/ingestor_logstream.rs (1)

41-52: Prefer using check_or_load_stream for mode-aware existence checks

This endpoint currently re-implements a existence check using streams.contains + create_stream_and_schema_from_storage. We already have a helper that handles mode differences and storage loading: PARSEABLE.check_or_load_stream(&stream_name). It improves readability and keeps logic consistent with other handlers.

Suggested change:

-    // if the stream not found in memory map,
-    //check if it exists in the storage
-    //create stream and schema from storage
-    if !PARSEABLE.streams.contains(&stream_name)
-        && !PARSEABLE
-            .create_stream_and_schema_from_storage(&stream_name)
-            .await
-            .unwrap_or(false)
-    {
-        return Err(StreamNotFound(stream_name.clone()).into());
-    }
+    // Ensure parseable is aware of stream in distributed mode
+    if !PARSEABLE.check_or_load_stream(&stream_name).await {
+        return Err(StreamNotFound(stream_name.clone()).into());
+    }
src/storage/mod.rs (1)

133-141: Only server-side serialization of StreamInfo detected – no deserialization occurs

A search for serde_json::from_*<StreamInfo> or web::Json<StreamInfo> returned no matches. That means the service only emits StreamInfo (now in camelCase) and never consumes it.

• Document this breaking change for any API consumers.
• If in the future you need to accept incoming snake_case payloads, consider adding serde aliases on the key fields in src/storage/mod.rs:

#[serde(alias = "created_at")]
pub created_at: String,
#[serde(alias = "first_event_at")]
pub first_event_at: Option<String>,
#[serde(alias = "latest_event_at")]
pub latest_event_at: Option<String>,
src/handlers/http/cluster/mod.rs (1)

599-632: Retaining cleanup API: signature and behavior changes look good; include status in error for better diagnostics

Returning Result<(), ObjectStorageError> and accepting dates: &[String] fits the new orchestration model. Minor improvement: include the HTTP status in logs/errors to aid debugging.

-    if !resp.status().is_success() {
-        let body = resp.text().await.unwrap_or_default();
+    if !resp.status().is_success() {
+        let status = resp.status();
+        let body = resp.text().await.unwrap_or_default();
         error!(
-            "failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
-            ingestor.domain_name, body
+            "failed to perform cleanup on retention: {} (status: {})\nResponse Returned: {:?}",
+            ingestor.domain_name, status, body
         );
-        return Err(ObjectStorageError::Custom(format!(
-            "failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
-            ingestor.domain_name, body
-        )));
+        return Err(ObjectStorageError::Custom(format!(
+            "failed to perform cleanup on retention: {} (status: {})\nResponse Returned: {:?}",
+            ingestor.domain_name, status, body
+        )));
     }
src/handlers/http/logstream.rs (2)

336-351: Single-call fetch for first/latest timestamps: solid; consider caching to reduce storage reads

Using storage.get_first_and_latest_event_from_storage is a clear improvement and aligns with the new StreamInfo.latest_event_at. One consideration: this now hits object storage on every info request. If this endpoint is hot, consider caching these values on the stream metadata (best-effort) and/or adding lightweight memoization with short TTL to reduce storage I/O. Optional and can be a follow-up.


415-421: Consistent accessor usage here is good; consider applying uniformly across the module

This spot adopts PARSEABLE.storage().get_object_store(); elsewhere in this file we still have PARSEABLE.storage.get_object_store(). For readability and future refactors, consider standardizing on the accessor.

src/prism/logstream/mod.rs (2)

155-170: Good move: single storage hit for both timestamps with graceful degradation.

Fetching both first/latest event timestamps via the storage provider simplifies logic and avoids repeated calls. On failure, logging a warning and defaulting to (None, None) is acceptable for a best-effort field.

One small improvement: if storage errors are transient (e.g., eventual consistency), consider downgrading to debug for noisy environments or adding error kind to the log to help triage.


184-184: StreamInfo now includes latest_event_at — verify serialization contract.

Assuming StreamInfo is annotated with serde camelCase (latestEventAt), this maps cleanly to the API surface. Double-check cross-file consistency (HTTP handlers and any UI/clients) for the new field name.

src/storage/object_storage.rs (4)

885-915: Happy path reuse of a single manifest is good; add a quick short-circuit for empty manifests.

Current logic already handles empty manifest_list early. Consider short-circuiting when min/max manifest paths are identical (you already do). No further action required beyond the min/max selection fix.


925-957: Consider using existing get_manifest instead of reaching through DataFusion registry.

load_manifest_from_path goes via QUERY_SESSION and object_store_registry, which ties this helper to the query runtime and assumes the store is registered there with appropriate auth. You already have trait methods for fetching objects; if manifest_path can be translated to a RelativePath (or you can add a helper to do so), prefer using self.get_object(...) to keep concerns within ObjectStorage.

If absolute URLs are required, at least document the reliance on QUERY_SESSION registration and clarify thread-safety/perf expectations.


1068-1069: Return shape adjustment looks consistent with the upload pipeline.

process_parquet_files returning only manifest files simplifies the calling site. Make sure downstream snapshot update logic doesn’t rely on the removed status flag.


1084-1104: Concurrent upload path is OK; semaphore bound remains high but unchanged.

No issues; bound of 100 may be high for some backends — consider making it configurable if you see pressure in production.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 87f499e and f88adf3.

📒 Files selected for processing (7)
  • src/handlers/http/cluster/mod.rs (2 hunks)
  • src/handlers/http/logstream.rs (3 hunks)
  • src/handlers/http/modal/ingest/ingestor_logstream.rs (2 hunks)
  • src/migration/mod.rs (1 hunks)
  • src/prism/logstream/mod.rs (2 hunks)
  • src/storage/mod.rs (1 hunks)
  • src/storage/object_storage.rs (8 hunks)
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-07-28T17:10:39.448Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1392
File: src/migration/stream_metadata_migration.rs:303-322
Timestamp: 2025-07-28T17:10:39.448Z
Learning: In Parseable's migration system (src/migration/stream_metadata_migration.rs), each migration function updates the metadata to the current latest format using CURRENT_OBJECT_STORE_VERSION and CURRENT_SCHEMA_VERSION constants, rather than producing incremental versions. For example, v5_v6 function produces v7 format output when these constants are set to "v7", not v6 format.

Applied to files:

  • src/migration/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/migration/mod.rs
  • src/handlers/http/modal/ingest/ingestor_logstream.rs
  • src/handlers/http/logstream.rs
  • src/prism/logstream/mod.rs
📚 Learning: 2025-08-18T19:10:11.941Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:163-164
Timestamp: 2025-08-18T19:10:11.941Z
Learning: Field statistics calculation in src/storage/field_stats.rs uses None for the time_partition parameter when calling flatten_and_push_logs(), as field stats generation does not require time partition functionality.

Applied to files:

  • src/storage/object_storage.rs
📚 Learning: 2025-08-18T12:37:47.732Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.732Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/storage/object_storage.rs
📚 Learning: 2025-06-18T06:45:37.070Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:832-843
Timestamp: 2025-06-18T06:45:37.070Z
Learning: Stats calculation for parquet files in Parseable is done synchronously during the upload process because files are deleted from staging after upload. This prevents race conditions and ensures stats are calculated while files are still available locally.

Applied to files:

  • src/storage/object_storage.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.

Applied to files:

  • src/storage/object_storage.rs
🧬 Code Graph Analysis (5)
src/migration/mod.rs (1)
src/parseable/mod.rs (1)
  • storage (244-246)
src/handlers/http/modal/ingest/ingestor_logstream.rs (2)
src/parseable/mod.rs (1)
  • storage (244-246)
src/catalog/mod.rs (1)
  • remove_manifest_from_snapshot (454-495)
src/handlers/http/logstream.rs (1)
src/parseable/mod.rs (1)
  • storage (244-246)
src/prism/logstream/mod.rs (1)
src/parseable/mod.rs (1)
  • storage (244-246)
src/storage/object_storage.rs (3)
src/query/stream_schema_provider.rs (3)
  • schema (461-463)
  • manifest_items (1032-1059)
  • serde_json (513-513)
src/storage/field_stats.rs (1)
  • calculate_field_stats (79-130)
src/parseable/mod.rs (3)
  • new (146-158)
  • serde_json (300-300)
  • serde_json (306-306)
🔇 Additional comments (8)
src/handlers/http/modal/ingest/ingestor_logstream.rs (1)

54-66: Good: unwrap removed and proper error mapping + 204 response

  • Error path now returns a meaningful StreamError with 500 on failure from remove_manifest_from_snapshot.
  • Success returns 204 No Content, which matches the new contract for cluster retention cleanup.

LGTM.

src/handlers/http/logstream.rs (1)

365-366: latest_event_at wiring looks correct

StreamInfo now carries latest_event_at, sourced from storage; matches the new API contract.

src/storage/object_storage.rs (6)

73-74: Bringing in DEFAULT_TIMESTAMP_KEY is correct for fallback behavior.

This keeps behavior consistent when time_partition is unset.


135-141: OK to ignore stats calc result; trace on failure is appropriate.

The stats path runs best-effort and won’t block uploads. This matches prior decisions to keep field stats non-critical.


166-184: Async helper rename/shape makes sense; doc is clear.

Returning nothing and handling errors internally (trace) simplifies call sites. No functional issues spotted.


958-1012: Nice: safe timestamp extraction without unwraps.

Using DateTime::from_timestamp_millis and RFC3339 parsing with guards avoids introducing bogus epoch defaults. This aligns with prior guidance to skip invalid values instead of defaulting them.


1014-1030: Helper composition is clear; signatures look good.

No functional concerns; keeps the core routine focused.


1163-1174: Correct: remove staged files only for successfully uploaded entries.

This matches the desired retry semantics.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/storage/object_storage.rs (1)

975-1019: Timestamp extraction correctly skips invalid values; previous epoch-default bug resolved.

Good job avoiding unwrap_or_default and only considering valid conversions/parses for min/max. This prevents skewed epoch results.

🧹 Nitpick comments (3)
src/storage/object_storage.rs (3)

877-889: Avoid Option::is_none_or to prevent MSRV surprises; use map_or instead.

is_none_or is relatively new and can cause MSRV bumps. map_or reads fine and is widely supported.

-        for &item in &all_manifest_items {
-            if first_manifest_item
-                .is_none_or(|cur: &ManifestItem| item.time_lower_bound < cur.time_lower_bound)
-            {
-                first_manifest_item = Some(item);
-            }
-            if latest_manifest_item
-                .is_none_or(|cur: &ManifestItem| item.time_upper_bound > cur.time_upper_bound)
-            {
-                latest_manifest_item = Some(item);
-            }
-        }
+        for &item in &all_manifest_items {
+            if first_manifest_item
+                .map_or(true, |cur: &ManifestItem| item.time_lower_bound < cur.time_lower_bound)
+            {
+                first_manifest_item = Some(item);
+            }
+            if latest_manifest_item
+                .map_or(true, |cur: &ManifestItem| item.time_upper_bound > cur.time_upper_bound)
+            {
+                latest_manifest_item = Some(item);
+            }
+        }

932-964: Make manifest path handling robust for absolute URLs (s3://, gs://) vs relative keys.

object_store::path::Path::parse typically expects a key relative to the store, not a full URL. If manifest_path is an absolute URL (it is created via absolute_url during upload), parsing directly may fail on some backends. Normalize the path by stripping the URL and passing only the key to get().

-        let path = object_store::path::Path::parse(manifest_path)
-            .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?;
+        // Support both absolute URLs (e.g., s3://bucket/prefix/file) and relative keys (prefix/file)
+        let path = match url::Url::parse(manifest_path) {
+            Ok(u) => {
+                // Use only the key part for the store
+                let key = u.path().trim_start_matches('/');
+                object_store::path::Path::parse(key)
+                    .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?
+            }
+            Err(_) => object_store::path::Path::parse(manifest_path)
+                .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?,
+        };

1142-1181: Unreachable branch in collect_upload_results; simplify and clarify cleanup semantics.

UploadResult.manifest_file is always Some on success and errors return Err, so the None branch is dead. Removing it clarifies intent and avoids confusion.

-            Ok(Ok(upload_result)) => {
-                if let Some(manifest_file) = upload_result.manifest_file {
-                    uploaded_files.push((upload_result.file_path, manifest_file));
-                } else {
-                    // File failed to upload, clean up
-                    if let Err(e) = remove_file(upload_result.file_path) {
-                        warn!("Failed to remove staged file: {e}");
-                    }
-                }
-            }
+            Ok(Ok(upload_result)) => {
+                // Success: record manifest and remove staged file later
+                if let Some(manifest_file) = upload_result.manifest_file {
+                    uploaded_files.push((upload_result.file_path, manifest_file));
+                }
+            }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between f88adf3 and fa30263.

📒 Files selected for processing (1)
  • src/storage/object_storage.rs (9 hunks)
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-08-18T19:10:11.941Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:163-164
Timestamp: 2025-08-18T19:10:11.941Z
Learning: Field statistics calculation in src/storage/field_stats.rs uses None for the time_partition parameter when calling flatten_and_push_logs(), as field stats generation does not require time partition functionality.

Applied to files:

  • src/storage/object_storage.rs
📚 Learning: 2025-08-18T12:37:47.732Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.732Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/storage/object_storage.rs
📚 Learning: 2025-06-18T06:45:37.070Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:832-843
Timestamp: 2025-06-18T06:45:37.070Z
Learning: Stats calculation for parquet files in Parseable is done synchronously during the upload process because files are deleted from staging after upload. This prevents race conditions and ensures stats are calculated while files are still available locally.

Applied to files:

  • src/storage/object_storage.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.

Applied to files:

  • src/storage/object_storage.rs
🧬 Code Graph Analysis (1)
src/storage/object_storage.rs (4)
src/query/stream_schema_provider.rs (3)
  • schema (461-463)
  • manifest_items (1032-1059)
  • serde_json (513-513)
src/storage/field_stats.rs (1)
  • calculate_field_stats (79-130)
src/parseable/mod.rs (3)
  • new (146-158)
  • serde_json (300-300)
  • serde_json (306-306)
src/parseable/streams.rs (1)
  • new (114-131)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (5)
src/storage/object_storage.rs (5)

76-93: UploadContext fields addition is sound and simplifies downstream calls.

Caching custom_partition and schema in UploadContext is a good move; it reduces repeated lookups and clarifies ownership.


136-137: Synchronous field-stats trigger preserved (good with staging deletion).

Keeping stats calculation in the upload path before deleting staged files aligns with the prior invariant that files must exist locally when stats are computed.


168-185: calculate_stats_if_enabled: best-effort behavior is appropriate.

Dropping the boolean return and logging errors at trace keeps ingestion resilient without noisy logs. No callers expect the bool anymore, which simplifies the flow.


877-904: Single-pass min/max selection without moving values: nice fix.

Using references and a single pass avoids the prior move/clone issue and is more efficient.


892-893: Ignore: get_time_partition Returns the Column Name, Not a Limit

The get_time_partition() API returns an Option<String> for the name of the timestamp/partition column, not a numeric time limit. There is no get_timestamp_key() accessor on Stream, and DEFAULT_TIMESTAMP_KEY is exactly the right fallback when no column override is provided. You can safely ignore this suggestion.

Likely an incorrect or invalid review comment.

coderabbitai[bot]
coderabbitai bot previously approved these changes Aug 20, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
src/prism/home/mod.rs (4)

43-51: Prefer a named struct over a 4-tuple for readability and future evolution

The tuple has grown to four elements and is getting unwieldy. A dedicated struct (e.g., StreamMetadataAggregate) will make call sites self-documenting and reduce ordering mistakes when adding/removing fields.

If you want to keep it lightweight now, consider at least using the alias consistently at call sites (see suggestion below for get_stream_metadata signature).


61-67: Align JSON casing with other APIs by using camelCase

Nice addition. To keep the Home API consistent with the rest (StreamInfo moved to camelCase), consider emitting camelCase here as well.

Apply:

 #[derive(Debug, Serialize)]
-pub struct DataSet {
+#[serde(rename_all = "camelCase")]
+pub struct DataSet {
   title: String,
   dataset_type: TelemetryType,
   #[serde(skip_serializing_if = "Option::is_none")]
   time_partition: Option<String>,
 }

215-225: Use the StreamMetadataResponse alias in the signature for maintainability

You already introduced a type alias; using it here will keep the function signature concise and future-proof if the payload changes again.

-async fn get_stream_metadata(
-    stream: String,
-) -> Result<
-    (
-        String,
-        Vec<ObjectStoreFormat>,
-        TelemetryType,
-        Option<String>,
-    ),
-    PrismHomeError,
-> {
+async fn get_stream_metadata(stream: String) -> StreamMetadataResponse {

255-257: Don’t assume the first manifest carries time_partition; pick the first non-empty across manifests

In mixed or evolving deployments, earlier manifests might not have time_partition populated while later ones do. Prefer the first non-empty value to avoid incorrectly returning None.

-    let time_partition = stream_jsons[0].time_partition.clone();
-
-    Ok((stream, stream_jsons, dataset_type, time_partition))
+    let time_partition = stream_jsons
+        .iter()
+        .find_map(|m| m.time_partition.clone());
+
+    Ok((stream, stream_jsons, dataset_type, time_partition))

Optionally, you might also sanity-check that telemetry_type is consistent across manifests and log a warn if it diverges.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between fa30263 and dc4ba4a.

📒 Files selected for processing (1)
  • src/prism/home/mod.rs (6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
🔇 Additional comments (1)
src/prism/home/mod.rs (1)

132-146: Propagating time_partition into DataSet looks correct

You’re passing through the extracted time_partition and preserving internal-stream filtering semantics. Looks good.

coderabbitai[bot]
coderabbitai bot previously approved these changes Aug 20, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
src/prism/home/mod.rs (4)

134-149: Using first stream.json for dataset_type/time_partition — consider consistency check across files

If multiple stream.json files exist and can diverge, silently taking index 0 may hide inconsistencies. Optional: assert they’re uniform and log if not, while still proceeding.

Apply within this block:

-                stream_wise_stream_json.insert(stream.clone(), metadata);
-                datasets.push(DataSet {
-                    title: stream,
-                    dataset_type,
-                    time_partition,
-                });
+                // Optional: sanity-check uniformity across metadata entries
+                let mixed_types = metadata.iter().any(|m| m.telemetry_type != dataset_type);
+                let mixed_partitions = metadata.iter().any(|m| m.time_partition != time_partition);
+                if mixed_types || mixed_partitions {
+                    tracing::warn!(
+                        stream = %stream,
+                        mixed_types,
+                        mixed_partitions,
+                        "Non-uniform stream metadata detected; using the first entry"
+                    );
+                }
+                stream_wise_stream_json.insert(stream.clone(), metadata);
+                datasets.push(DataSet {
+                    title: stream,
+                    dataset_type,
+                    time_partition,
+                });

Add at top of the file if needed:

use tracing::warn;

257-260: time_partition derived from first metadata entry — align with optional uniformity check

This follows the “first entry” approach; if you adopt the uniformity check above, this remains fine. Otherwise, be aware of potential divergence across entries.


160-165: Avoid cloning the entire stream metadata map per date (perf nit)

Cloning the full HashMap for each day multiplies memory traffic. Pass by reference instead; join_all supports non-'static futures with borrows.

Use these diffs:

-    let stats_futures = dates
-        .iter()
-        .map(|date| stats_for_date(date.clone(), stream_wise_stream_json.clone()));
-    let stats_results: Vec<Result<DatedStats, PrismHomeError>> =
-        futures::future::join_all(stats_futures).await;
+    let stats_futures = dates
+        .iter()
+        .map(|date| stats_for_date(date.clone(), &stream_wise_stream_json));
+    let stats_results: Vec<Result<DatedStats, PrismHomeError>> =
+        futures::future::join_all(stats_futures).await;

And update the callee signature accordingly (see below).


262-265: Match signature to accept a reference to avoid clones

Change stats_for_date to borrow the map.

-async fn stats_for_date(
-    date: String,
-    stream_wise_meta: HashMap<String, Vec<ObjectStoreFormat>>,
-) -> Result<DatedStats, PrismHomeError> {
+async fn stats_for_date(
+    date: String,
+    stream_wise_meta: &HashMap<String, Vec<ObjectStoreFormat>>,
+) -> Result<DatedStats, PrismHomeError> {

The function body remains the same; iteration over stream_wise_meta.values() continues to work.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between dc4ba4a and cd6cb62.

📒 Files selected for processing (2)
  • src/alerts/alert_structs.rs (1 hunks)
  • src/prism/home/mod.rs (6 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-18T12:37:47.732Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.732Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/prism/home/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (7)
src/alerts/alert_structs.rs (3)

415-419: alert_info → alertInfo rename: double-check client expectations

This will change the JSON field name in responses. Confirm that the front-end or any API consumers reading alert_info are updated to expect alertInfo. If you need to remain backward-compatible on incoming payloads, adding #[serde(alias = "alert_info")] in a Deserialize-implementing struct is an option.


422-427: LGTM: AlertsInfo serde camelCase

No functional differences here since field names are already lower-case; the attribute future-proofs new fields.


406-412: Confirm downstream consumers and consider serde aliases for legacy keys

Renaming AlertsSummary with #[serde(rename_all = "camelCase")] means that the not_triggered field will now serialize as "notTriggered" (previously "not_triggered"). This is a breaking change for any UI, SDK, or documentation that consumes this payload.

  • Update all downstream clients, SDKs, and docs to expect notTriggered instead of not_triggered (or not-triggered).
  • If you need to continue deserializing legacy payloads, you can add aliases to the field:
 #[serde(rename_all = "camelCase")]
 pub struct AlertsSummary {
     pub total: u64,
     pub triggered: AlertsInfoByState,
     pub disabled: AlertsInfoByState,
+    #[serde(alias = "not_triggered", alias = "not-triggered")]
     pub not_triggered: AlertsInfoByState,
 }

Would you like me to scan the repository for any other hard-coded references to these legacy keys and prepare a follow-up patch?

src/prism/home/mod.rs (4)

43-51: Type alias for StreamMetadataResponse improves readability

Clearer return type, consistent with the expanded tuple. Looks good.


62-68: DataSet gains optional time_partition with camelCase — good shape

#[serde(skip_serializing_if = "Option::is_none")] is the right call to avoid noisy nulls. Suitable for UI display.


217-227: get_stream_metadata tuple expansion is consistent and clear

Return signature aligns with the new DataSet fields; OK.


71-77: HomeResponse JSON keys updated to camelCase — internal code clean

I ran a global search for the legacy snake_case keys in all .ts, .js, and .json files and found no occurrences outside of src/prism/home/mod.rs.

• No internal front-end or API consumer code still references alerts_summary, stats_details, or top_five_ingestion.
• The JSON response will now use alertsSummary, statsDetails, and topFiveIngestion.

Please confirm any external UIs or third-party clients are updated to consume the new camelCase field names.

coderabbitai[bot]
coderabbitai bot previously approved these changes Aug 20, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/storage/object_storage.rs (2)

965-1020: Timestamp extraction handles invalid values correctly (thanks for fixing earlier unwrap_or_default issue)

  • Int stats: guarded by from_timestamp_millis returning Some.
  • String stats: guarded by RFC3339 parse; converted to Utc.

This aligns with the prior guidance to skip invalid values rather than default to epoch. Good job.


1142-1181: Do not delete staged files when no manifest is produced

Current branch removes the staged file if manifest_file is None. This violates the intended retry behavior for failures where we should keep files in staging. Also, "success-without-manifest" is suspicious; better to log and retain for retry rather than delete.

Apply this diff:

-                if let Some(manifest_file) = upload_result.manifest_file {
-                    uploaded_files.push((upload_result.file_path, manifest_file));
-                } else {
-                    // File failed to upload, clean up
-                    if let Err(e) = remove_file(upload_result.file_path) {
-                        warn!("Failed to remove staged file: {e}");
-                    }
-                }
+                if let Some(manifest_file) = upload_result.manifest_file {
+                    uploaded_files.push((upload_result.file_path, manifest_file));
+                } else {
+                    // No manifest produced; keep for retry
+                    warn!(
+                        "Upload yielded no manifest; keeping staged file for retry"
+                    );
+                }

Early return on first error is retained (good) and aligns with previous guidance that failed files should remain in staging for retry.

🧹 Nitpick comments (8)
src/storage/object_storage.rs (3)

168-185: Prefer elevating log level for failed stats from trace to debug

Silent failures at trace can be missed in prod. Recommend debug for visibility without being noisy.

Apply this diff:

-        if let Err(err) =
-            calculate_field_stats(stream_name, path, schema, max_field_statistics).await
-        {
-            tracing::trace!(
+        if let Err(err) =
+            calculate_field_stats(stream_name, path, schema, max_field_statistics).await
+        {
+            tracing::debug!(
                 "Error calculating field stats for stream {}: {}",
                 stream_name,
                 err
             );
         }

932-964: Minor import cleanup in helper

Manifest is already imported at the top of the module. The local import duplicates it unnecessarily; retain only QUERY_SESSION to avoid redundant imports.

Apply this diff:

-        use crate::{catalog::manifest::Manifest, query::QUERY_SESSION};
+        use crate::query::QUERY_SESSION;

1170-1181: Parallel cleanup with rayon is acceptable, but optional

Using rayon for removing files is fine; the IO might dwarf CPU anyway. If you want to avoid engaging a separate threadpool, a simple sequential loop would also suffice. No action needed.

src/event/format/json.rs (1)

41-44: Add a brief Rustdoc to clarify p_timestamp semantics

Minor nit to help future readers: document that p_timestamp is ingestion/processing time, not the event's source timestamp.

 impl Event {
-    pub fn new(json: Value, p_timestamp: DateTime<Utc>) -> Self {
+    /// Create a JSON event with explicit Parseable ingestion time (`p_timestamp`).
+    ///
+    /// Note: `p_timestamp` represents the time at ingestion into Parseable (server/ingestor time).
+    /// The event's source time (if any) is still derived from the payload when a time partition is configured.
+    pub fn new(json: Value, p_timestamp: DateTime<Utc>) -> Self {
         Self { json, p_timestamp }
     }
src/utils/json/mod.rs (1)

78-115: Making apply_generic_flattening_for_partition public is appropriate

This enables reusing the same flattening semantics outside this module (e.g., pstats emission), keeping behavior consistent with the ingestion path.

Consider adding a short doc comment clarifying:

  • It may return 1..N flattened objects.
  • Validation is always enabled (true).
  • It forwards time_partition_limit as-is (no defaults here). Per earlier learnings, defaults are handled in flatten/validate_time_partition, so nothing else is needed here.
src/storage/field_stats.rs (2)

95-101: Be careful failing hard on parquet-path datetime extraction

Currently, any mismatch in path format aborts stats computation. If deployment paths vary (common across filesystems and object stores), this will be fragile.

Recommend either:

  • downgrade to a warning and fall back to Utc::now() (still marking the event with ingestion time), or
  • return a sentinel timestamp and proceed.

If strictness is desired, ensure the extractor handles both dotted and hierarchical path formats; see suggested implementation change below.


146-166: Avoid schema fetch inside the loop and remove unwrap on size

Two nit-level improvements:

  • Fetch the stats stream schema once before iterating to avoid redundant lookups.
  • Replace unwrap() on serialization with proper error propagation to prevent panics on unexpected serde failures.
-    for json in vec_json {
-        let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
-        let schema = PARSEABLE
-            .get_stream(DATASET_STATS_STREAM_NAME)?
-            .get_schema_raw();
+    // Fetch schema once to avoid repeated lookups
+    let schema = PARSEABLE
+        .get_stream(DATASET_STATS_STREAM_NAME)?
+        .get_schema_raw();
+
+    for json in vec_json {
+        // Compute byte length safely; propagate serialization errors
+        let origin_size = serde_json::to_vec(&json)
+            .map_err(|e| PostError::Invalid(e.into()))?
+            .len() as u64; // byte length
src/connectors/kafka/processor.rs (1)

84-94: Attach ingestion time at creation — aligned with the new constructor

Supplying Utc::now() is consistent with the ingestion-time semantics for p_timestamp. Callers with access to upstream timestamps can optionally pass those instead.

If ConsumerRecord exposes a message timestamp, consider preferring it over Utc::now() for better fidelity (fall back to Utc::now() when absent).

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between cd6cb62 and b2bd257.

📒 Files selected for processing (7)
  • src/connectors/kafka/processor.rs (2 hunks)
  • src/event/format/json.rs (1 hunks)
  • src/handlers/http/ingest.rs (13 hunks)
  • src/storage/field_stats.rs (6 hunks)
  • src/storage/object_storage.rs (9 hunks)
  • src/utils/json/mod.rs (1 hunks)
  • src/utils/mod.rs (0 hunks)
💤 Files with no reviewable changes (1)
  • src/utils/mod.rs
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-08-18T19:10:11.941Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/handlers/http/ingest.rs:163-164
Timestamp: 2025-08-18T19:10:11.941Z
Learning: Field statistics calculation in src/storage/field_stats.rs uses None for the time_partition parameter when calling flatten_and_push_logs(), as field stats generation does not require time partition functionality.

Applied to files:

  • src/storage/field_stats.rs
  • src/storage/object_storage.rs
📚 Learning: 2025-06-18T11:15:10.836Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:31-41
Timestamp: 2025-06-18T11:15:10.836Z
Learning: DataFusion's parquet reader defaults to using view types (Utf8View, BinaryView) when reading parquet files via the schema_force_view_types configuration (default: true). This means StringViewArray and BinaryViewArray downcasting is required when processing Arrow arrays from DataFusion parquet operations, even though these types are behind nightly feature flags.

Applied to files:

  • src/storage/field_stats.rs
📚 Learning: 2025-08-18T12:37:47.732Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.732Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/utils/json/mod.rs
  • src/storage/object_storage.rs
📚 Learning: 2025-06-18T06:45:37.070Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/storage/object_storage.rs:832-843
Timestamp: 2025-06-18T06:45:37.070Z
Learning: Stats calculation for parquet files in Parseable is done synchronously during the upload process because files are deleted from staging after upload. This prevents race conditions and ensures stats are calculated while files are still available locally.

Applied to files:

  • src/storage/object_storage.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.

Applied to files:

  • src/storage/object_storage.rs
🧬 Code Graph Analysis (4)
src/storage/field_stats.rs (3)
src/handlers/http/ingest.rs (1)
  • ingest (55-130)
src/utils/json/mod.rs (1)
  • apply_generic_flattening_for_partition (78-115)
src/event/format/json.rs (1)
  • new (42-44)
src/handlers/http/ingest.rs (1)
src/event/format/json.rs (1)
  • new (42-44)
src/connectors/kafka/processor.rs (1)
src/event/format/json.rs (1)
  • new (42-44)
src/storage/object_storage.rs (3)
src/parseable/mod.rs (4)
  • storage (244-246)
  • new (146-158)
  • serde_json (300-300)
  • serde_json (306-306)
src/storage/field_stats.rs (1)
  • calculate_field_stats (88-168)
src/query/stream_schema_provider.rs (3)
  • schema (461-463)
  • manifest_items (1032-1059)
  • serde_json (513-513)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (16)
src/storage/object_storage.rs (10)

48-48: Import of ManifestItem is appropriate

Used in min/max manifest selection below. No concerns.


65-67: Importing DATASET_STATS_STREAM_NAME for exclusion is correct

Used in calculate_stats_if_enabled to avoid self-reporting. Matches prior behavior.


74-75: Good: DEFAULT_TIMESTAMP_KEY fallback

Using DEFAULT_TIMESTAMP_KEY when no custom time partition is present is sensible and consistent.


79-94: UploadContext now carries custom_partition and schema

This reduces parameter plumbing for spawned tasks and centralizes stream context. Looks good.


136-142: Stats calculation remains synchronous pre-cleanup (good)

calculate_stats_if_enabled is awaited before returning, keeping alignment with the invariant that staging files must be present while stats are computed.


892-931: API shape: Returning RFC3339 strings is pragmatic

Converting DateTime to RFC3339 here simplifies downstream serialization. The fallback to DEFAULT_TIMESTAMP_KEY when a stream handle isn't available is a good safeguard.


1021-1037: Helper separation is clean

extract_timestamp_from_manifest composes the two helpers nicely and keeps IO outside the extraction logic.


1075-1076: Refactor to return manifest files only is coherent

process_parquet_files returning Vec simplifies the upload pipeline and snapshot update. No issues.


1213-1226: stream_relative_path custom partition handling looks correct

Adjusting the path slice count by custom_partition field count preserves expected folder layout.


841-931: No MSRV issue with Option::is_none_or

Your Cargo.toml already declares

rust-version = "1.88.0"

and Option::is_none_or was stabilized in Rust 1.88.0, so you can safely keep using it—no need to fall back to map_or.

Optional: you could eliminate the intermediate Vec<&ManifestItem> by iterating the manifest_list entries directly. For example:

let mut first_manifest_item = None;
let mut latest_manifest_item = None;

for stream_format in &stream_jsons {
    for &item in &stream_format.snapshot.manifest_list {
        if first_manifest_item.map_or(true, |cur: &ManifestItem| item.time_lower_bound < cur.time_lower_bound) {
            first_manifest_item = Some(item);
        }
        if latest_manifest_item.map_or(true, |cur: &ManifestItem| item.time_upper_bound > cur.time_upper_bound) {
            latest_manifest_item = Some(item);
        }
    }
}

This avoids the extra Vec allocation but is entirely optional.

Likely an incorrect or invalid review comment.

src/event/format/json.rs (2)

42-43: Explicit ingestion timestamp in constructor is a good API improvement

Requiring callers to supply p_timestamp makes ingestion time explicit and testable. Call sites can now choose the right source of truth (e.g., transport-provided timestamp vs system clock).


42-43: Verify all call sites updated to two-arg Event::new

I ran a quick scan for fully-qualified single-argument calls:

json::Event::new(<one-arg>)
format::json::Event::new(<one-arg>)

and found none. To be certain there are no remaining single-arg invocations, please also:

  • Search for unqualified calls (e.g. Event::new(<one-arg>)) that may resolve to the JSON Event type via use imports.
  • Check for wildcard or renamed imports of Event in other modules.
  • Ensure every Event::new invocation now passes both json and p_timestamp.

You can try:

rg -nP --type=rust '\bEvent::new\s*\(\s*[^,()]+\s*\)' -g '*.rs'

Once you’ve confirmed no single-arg calls remain, this change will be safe to merge.

src/storage/field_stats.rs (2)

61-63: Good: centralized internal stats stream constants

Moving the stream name here and defining a dedicated custom partition key (dataset_name) clarifies ownership and intent for internal stats.


137-142: Correct use of the new public flattening helper

Using apply_generic_flattening_for_partition to explode/flatten the stats payload before emitting per-record events matches ingestion behavior and ensures partition fields exist on each emitted record.

src/handlers/http/ingest.rs (2)

140-151: Internal stream ingestion updated correctly to pass ingestion time

Using Utc::now() for p_timestamp is appropriate here since internal streams are produced locally and don’t carry a source timestamp.


525-525: Tests updated to the 2-arg json::Event::new signature

All impacted tests now pass Utc::now(); this keeps them focused on record batch shape/typing without coupling to timestamps.

Also applies to: 566-574, 600-608, 638-641, 671-675, 690-692, 716-724, 770-778, 825-828, 905-913, 983-991

@nitisht nitisht merged commit f0e7a17 into parseablehq:main Aug 20, 2025
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants